Python: (core): Add functional workflow API#4238
Python: (core): Add functional workflow API#4238moonbox3 wants to merge 8 commits intomicrosoft:mainfrom
Conversation
Python Test Coverage Report •
Python Unit Test Overview
|
||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Pull request overview
This PR introduces a functional workflow API as an alternative to the existing graph-based workflow API. The functional approach allows users to write workflows as plain async functions decorated with @workflow, using native Python control flow (if/else, loops, asyncio.gather) instead of explicit graph construction with executors and edges. The @step decorator is optional and provides per-step checkpointing, caching, and observability.
Changes:
- Added core implementation (
_functional.py) with@workflow,@stepdecorators,RunContext,FunctionalWorkflow, andFunctionalWorkflowAgentclasses - Added comprehensive test suite (40+ test cases covering basic execution, HITL, checkpointing, streaming, error handling, edge cases)
- Added 6 sample files demonstrating functional workflows (basic pipeline, streaming, parallel execution, checkpointing, HITL, agent integration)
- Restructured getting-started samples to introduce functional workflows before graph workflows
- Updated exports in
__init__.pyto expose new functional API symbols
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_functional.py |
Core implementation of functional workflow API with RunContext, StepWrapper, FunctionalWorkflow, and FunctionalWorkflowAgent classes (1105 lines) |
python/packages/core/agent_framework/__init__.py |
Added exports for FunctionalWorkflow, FunctionalWorkflowAgent, RunContext, StepWrapper, step, and workflow |
python/packages/core/tests/workflow/test_functional_workflow.py |
Comprehensive test suite covering basic execution, events, parallelism, HITL, errors, streaming, state, checkpointing, control flow, and edge cases (1031 lines) |
python/samples/01-get-started/05_first_functional_workflow.py |
Getting started sample demonstrating basic functional workflow with plain async functions |
python/samples/01-get-started/06_first_graph_workflow.py |
Renamed and updated graph workflow sample (previously 05_first_workflow.py) |
python/samples/01-get-started/07_host_your_agent.py |
Renamed agent hosting sample (previously 06_host_your_agent.py) |
python/samples/01-get-started/README.md |
Updated sample listing to include both functional and graph workflow samples |
python/samples/03-workflows/functional/basic_pipeline.py |
Sample showing simplest sequential pipeline with @workflow decorator |
python/samples/03-workflows/functional/basic_streaming_pipeline.py |
Sample demonstrating streaming workflow events with run(stream=True) |
python/samples/03-workflows/functional/parallel_pipeline.py |
Sample showing fan-out/fan-in with asyncio.gather |
python/samples/03-workflows/functional/steps_and_checkpointing.py |
Sample explaining @step decorator for per-step checkpointing and observability |
python/samples/03-workflows/functional/hitl_review.py |
Sample demonstrating HITL with ctx.request_info() and resume |
python/samples/03-workflows/functional/agent_integration.py |
Sample showing agent calls inside workflows and .as_agent() wrapper |
python/samples/03-workflows/README.md |
Added functional workflow section to samples overview |
| @workflow | ||
| async def data_pipeline(url: str) -> str: | ||
| """A simple sequential data pipeline.""" | ||
| raw = await fetch_data(url) |
There was a problem hiding this comment.
I think it would be usefull to also demonstrate that because this is just a function, you do not have to wrap everything in steps, you can do some of the manipulation just as simple code between steps, making it a lot simpler
| return f"Draft document about '{topic}': Lorem ipsum dolor sit amet..." | ||
|
|
||
|
|
||
| @step |
There was a problem hiding this comment.
how this does step compare to handler is there a lot of overlap and could we reuse steps in graphs, or handler here?
There was a problem hiding this comment.
I want them to be conceptually different things. A @handler handles a message routed to it by the graph: it's reactive, tied to the executor contract. A @step marks a function call in a sequential flow: it's proactive, just "I called this function as step N." Different mental models, different names.
| print(f"State: {result1.get_final_state()}") | ||
| assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS | ||
|
|
||
| requests = result1.get_request_info_events() |
There was a problem hiding this comment.
what would happen here if state != WorkflowRunState.IDLE_WITH_PENDING_REQUESTS?
There was a problem hiding this comment.
If request_info() was never reached (an early return), the workflow completes normally with state IDLE and get_request_info_events() returns an empty list. Added a comment in the sample clarifying this.
There was a problem hiding this comment.
so requests would be None or would the get... call raise?
There was a problem hiding this comment.
get_request_info_events() returns an empty list. It's a filter over the event stream, so no requests means []. No exception raised.
| ``asyncio.gather``) instead of a graph-based topology. | ||
|
|
||
| A ``@workflow``-decorated async function receives its input as the first | ||
| positional argument. If the function needs HITL (``request_info``), custom |
There was a problem hiding this comment.
Should it be:
| positional argument. If the function needs HITL (``request_info``), custom | |
| positional argument. If a step needs HITL (``request_info``), custom |
?
There was a problem hiding this comment.
The docstring is describing the @workflow-decorated function, not a @step. The workflow function is what receives RunContext and calls request_info(). A @step doesn't receive context — so "If the function needs HITL" is referring to the workflow function.
| class RunContext: | ||
| """Execution context injected into ``@workflow`` functions. | ||
|
|
||
| Every ``@workflow`` invocation receives a ``RunContext`` instance that |
There was a problem hiding this comment.
I am a bit confused on which entity receives the context. Is it the workflow or the steps or both?
There was a problem hiding this comment.
Only the @workflow function receives RunContext. Steps don't — they're plain async functions with caching. If a step needs HITL input, the workflow calls request_info() and passes the result to the step as an argument.
| @workflow | ||
| async def hitl_pipeline(data: str, ctx: RunContext) -> str: | ||
| feedback = await ctx.request_info({"draft": data}, response_type=str) | ||
| return feedback |
There was a problem hiding this comment.
My brain maps @workflow to the graph-based Workflow and @step to Executor. I can see the benefit of allowing request_info at the workflow level. It's kind of like an executor whose sole purpose is to get user feedback. But should we also allow request_info inside a @step?
There was a problem hiding this comment.
By design, request_info lives at the workflow level — the workflow is the orchestrator that decides when to pause for input. Steps are meant to be self-contained units of work. If a step needs human input, the workflow calls request_info() first and passes the result to the step. This keeps steps simple and testable in isolation (no framework dependency).
| def __init__(self, func: Callable[..., Awaitable[R]], *, name: str | None = None) -> None: | ||
| if not inspect.iscoroutinefunction(func): | ||
| raise TypeError( | ||
| f"@step can only decorate async functions, but '{func.__name__}' is not a coroutine function." |
There was a problem hiding this comment.
This is probably not super important but should we also support not async methods?
There was a problem hiding this comment.
We could, but it adds complexity for limited benefit — the workflow itself is async, so wrapping a sync function in asyncio.to_thread() or similar is straightforward for users. For now, @step requires async. We can revisit if users ask for it.
| functools.update_wrapper(self, func) # type: ignore[arg-type] | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # run() — same overloaded interface as graph Workflow |
There was a problem hiding this comment.
Should we extract these methods and make both workflow types get them for free?
There was a problem hiding this comment.
Agree this is worth considering. The run() overload pattern and _finalize_events logic are similar between FunctionalWorkflow and graph Workflow. Worth a follow-up refactoring PR to extract a shared base, but out of scope for this one.
| self._last_step_cache = dict(ctx._step_cache) | ||
|
|
||
| # Yield collected events | ||
| for event in ctx._get_events(): |
There was a problem hiding this comment.
Question: Is this true streaming?
It looks like all events have been produced at this point.
There was a problem hiding this comment.
Currently events are collected during execution and yielded after. Within a single function execution there's no natural suspension point to stream intermediate events (unlike graph workflows where each superstep boundary is a yield point). However, @step events are collected incrementally, and the async generator yields them as they're produced. For true mid-function streaming, users can use ctx.add_event() — though these are still batched by the current implementation. This is a known limitation we can improve in a follow-up.
| cache_key = ctx._get_step_cache_key(self.name) | ||
| found, cached = ctx._get_cached_result(cache_key) | ||
| invocation_data = deepcopy({"args": args, "kwargs": kwargs}) if args or kwargs else None | ||
| if found: |
There was a problem hiding this comment.
Should we also check if the input arguments have the same values?
There was a problem hiding this comment.
There could be scenarios where some steps are checkpointed and some are not (the ones without the decorator). If a checkpointed step depends on the output of a non-checkpointed step and its output changes, it may lead to incorrect results.
There was a problem hiding this comment.
The cache key is (step_name, call_index) and relies on the documented determinism requirement — if the workflow function is deterministic w.r.t. step results, the arguments will always match on replay. Adding argument comparison would be defensive but expensive (deep equality on arbitrary objects) and could give false negatives for objects that don't implement __eq__.
There was a problem hiding this comment.
This is by design and documented: workflow functions must be deterministic w.r.t. step results. If a non-@step function is nondeterministic and a cached @step depends on it, results can diverge. The fix is to use @step on any function whose output affects downstream cached steps. This tradeoff keeps the simple case simple.
|
|
||
|
|
||
| # Plain async functions — no decorators needed | ||
| async def to_upper_case(text: str) -> str: |
There was a problem hiding this comment.
nit: this sample will become even simpler if we can remove the asyncs.
There was a problem hiding this comment.
Same as the sync support discussion — supporting sync functions would simplify the getting-started sample, but @workflow is inherently async (it runs an async event loop internally). We can consider sync support as a follow-up.
|
Btw, @eavanvalkenburg and @TaoChenOSU I think it would be best to stick this functional API in to its own package. We want to get some more signal around the APIs and use of it before we deem it "GA worthy," IMO. |
Motivation and Context
The functional API is a stepping stone between single-agent use and the full graph API. Users write workflows as plain async functions -- no executor classes, no edges, no builder patterns.
HITL resume or crash recovery
A very basic example of the functional workflow API:
Note:
@stepis opt-in for functions where per-step checkpointing matters (for example, agent calls). Without@step, workflows still support HITL and checkpointing — functions just re-execute on resume.ctx: RunContextis only needed when you use HITL (request_info), custom events (add_event), or state (get_state/set_state). Otherwise, omit it for a cleaner signature.Description
Contribution Checklist